Deeper Customisation with Parser

Hidden Function

The parser is the function that does the actual jobscript generation. It takes your resources, and spits out a list of lines that should go at the top of your script.

Up until now, we have been secretly relying on the internal “default” parser. This works by iterating over the stored Resource objects, and then dumps them into a list according to the schema "{pragma}{tag}{flag}{separator}{value}".

..note:: tag and separator are configurable keyword args of Resource, defaulting to “–” and “=”, respectively. You can also set them at the Computer level, as demonstrated below.

But what if this doesn’t work for your machine? It certainly doesn’t work for PBS based machines.

Well that’s where we need to specify our own.

This tutorial will explain the general concepts of defining a parser with the end goal of generating a PBS-friendly jobscript, but remember that these topics are general.

[1]:
from remotemanager.connection.computers import BaseComputer, Resource

class Computer(BaseComputer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.submitter = "sbatch"
        self.shebang = "#!/bin/bash"
        self.pragma = "#SBATCH"

        self.mpi = Resource(name="mpi", flag="ntasks", min=1)
        self.omp = Resource(name="omp", flag="cpus-per-task", min=1, max=64)
        self.nodes = Resource(name="nodes", flag="nodes", optional=False)
        self.time = Resource(name="time", flag="walltime", optional=False, format="time", default=3600)
[2]:
test = Computer()

test.nodes = 1

print(test.script())
#!/bin/bash
#SBATCH --nodes=1
#SBATCH --walltime=01:00:00

In PBS based parsers, the actual resource line is expected to be something that follows the form:

#PBS -l nodes=1:ppn=4,walltime=01:00:00

Additionally, OMP is usually specified by an environment variable.

With our default behaviour of putting one resource per line, this is obviously not going to work.

So we need to specify our own parser. Lets begin by explicitly defining the current behaviour:

[3]:
class Computer(BaseComputer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.submitter = "sbatch"
        self.shebang = "#!/bin/bash"
        self.pragma = "#SBATCH"

        self.mpi = Resource(name="mpi_per_node", flag="ppn", default=4)
        self.omp = Resource(name="omp", flag="cpus-per-task", default=4)
        self.nodes = Resource(name="nodes", flag="nodes", default=1)
        self.time = Resource(name="time", flag="walltime", format="time", optional=False, default=3600)

    def parser(self, resources: "Resources") -> list:
        output = []
        for resource in resources:
            if resource:
                output.append(resource.resource_line)

        return output
[4]:
test = Computer()
print(test.script())
#!/bin/bash
#SBATCH --ppn=4
#SBATCH --cpus-per-task=4
#SBATCH --nodes=1
#SBATCH --walltime=01:00:00

Parser “gotchas”

Theres a few things to note here.

resources

The resources keyword accepts a Resources object. As you’d imagine, this is a special carrier object that holds our Resource objects.

Note

Despite the similar name, Resources and Resource are not the same object. The former is a collection of the latter.

It functions like a list, with some extra functions. The important thing to note is that you can iterate over it, just like a list.

for resource in resource: will give you each resource, one by one.

Note

You can also access a Resource by its name, like you would a dictionary. See the section below for info.

if resource

bool(resource) will evaluate to True if the resource has both value and flag. This if resource line prevents resources without a value being added to the jobscript.

Omitting this usually results in a line like #SBATCH --ntasks=None.

[5]:
a = Resource(name="a", flag="resource")
print("A resource without a value will evaluate to False:", bool(a))

a.value = "test"
print("When given a value, it will then evaluate to True:", bool(a))
A resource without a value will evaluate to False: False
When given a value, it will then evaluate to True: True
[6]:
b = Resource(name="a")
print("A resource without a flag will always evaluate to False:", bool(b))

b.value = "test"
print("When given a value, it will still evaluate to False:", bool(b))
A resource without a flag will always evaluate to False: False
When given a value, it will still evaluate to False: False

return type

The parser should always return a list of lines, not a string.

Making our PBS parser

Okay now we’ve got that down, we can start thinking about how to edit our parser to work as we want.

Since we want to combine the resources mpi_per_node, nodes and time into a single line, we should exclude those in the main loop.

Lets also change the format to the right one too.

[7]:
class Computer(BaseComputer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # change our submitter and pragma to the right values
        self.submitter = "qsub"
        self.shebang = "#!/bin/bash"
        self.pragma = "#PBS"
        # PBS also uses a different format for the lines, lets set that too
        self.resource_tag = "-"
        self.resource_separator = " "
        # same resources, we're changing the FORMAT, not the CONTENT
        self.mpi = Resource(name="mpi_per_node", flag="ppn", default=4)
        self.omp = Resource(name="omp", flag="cpus-per-task", default=4)
        self.nodes = Resource(name="nodes", flag="nodes", default=1)
        self.time = Resource(name="time", flag="walltime", format="time", optional=False, default=3600)

    def parser(self, resources: "Resources") -> list:
        output = []
        for resource in resources:
            # exclude by name, we need to treat these separately
            if resource and resource.name not in ["mpi_per_node", "nodes", "time", "omp"]:
                output.append(resource.resource_line)

        return output

test = Computer()
print(test.script())
#!/bin/bash

That looks good, now there’s no breaking lines in the output.

Important

It looks like the loop is doing “nothing”, since we’re excluding everything that we’ve added. However it’s still good practice to add it anyway, since it will catch any extra resources you choose to add in the future.

Now we’ve cleaned the output, we can form the line we want and add it how we like.

Lets additionally grab the omp resource and export its value.

Oh, and add the cd $PBS_O_WORKDIR line for good measure.

[8]:
class Computer(BaseComputer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.submitter = "qsub"
        self.shebang = "#!/bin/bash"
        self.pragma = "#PBS"

        self.resource_tag = "-"
        self.resource_separator = " "

        self.mpi = Resource(name="mpi_per_node", flag="ppn", default=4)
        self.omp = Resource(name="omp", flag="cpus-per-task", default=4)
        self.nodes = Resource(name="nodes", flag="nodes", default=1)
        self.time = Resource(name="time", flag="walltime", format="time", optional=False, default=3600)

    def parser(self, resources: "Resources") -> list:
        output = []
        for resource in resources:
            if resource and resource.name not in ["mpi_per_node", "nodes", "time"]:
                output.append(resource.resource_line)
        # extract the values and format them before adding
        ppn = resources["mpi_per_node"]
        nodes = resources["nodes"]
        wtime = resources["time"]
        output.append(
            f"{resources.pragma} -l nodes={nodes}:"
            f"ppn={ppn},"
            f"walltime={wtime}"
        )
        # We can add extra important lines in here, too
        output.append("\ncd $PBS_O_WORKDIR")
        output.append(f"export OMP_NUM_THREADS={resources['omp']}")

        return output

test = Computer()
print(test.script())
#!/bin/bash
#PBS -cpus-per-task 4
#PBS -l nodes=1:ppn=4,walltime=01:00:00

cd $PBS_O_WORKDIR
export OMP_NUM_THREADS=4

Tip

You can also use this to add resource lines that need no value. #SBATCH --exclusive for slurm, for example.

Accessing Resources

While the Resources object functions primarily as a list, you can also access the Resource objects like you would a dict.

To demonstrate this, lets go back to a simpler parser format for the sake of brevity.

..note:: The key must be that of the name parameter, not the actual assignment.

[9]:
class Computer(BaseComputer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.submitter = "qsub"
        self.shebang = "#!/bin/bash"
        self.pragma = "#PBS"

        self.mpi = Resource(name="mpi_per_node", flag="ppn", default=4)
        self.omp = Resource(name="omp", flag="cpus-per-task", default=4)
        self.nodes = Resource(name="nodes", flag="nodes", default=1)
        self.time = Resource(name="time", flag="walltime", format="time", optional=False, default=3600)

    def parser(self, resources: "Resources") -> list:
        output = []
        for resource in resources:
            if resource:
                output.append(resource.resource_line)

        print(f"the value of the 'mpi' arg is {resources['mpi_per_node']}")

        return output

test = Computer()
test.mpi = 16
print(test.script())
the value of the 'mpi' arg is 16
#!/bin/bash
#PBS --ppn=16
#PBS --cpus-per-task=4
#PBS --nodes=1
#PBS --walltime=01:00:00

run_args

You can also access the run_args of the calling Dataset from within resources.

This is useful for extracting info such as the remote directory. For this we should create a Dataset:

[10]:
from remotemanager import Dataset

class Computer(BaseComputer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.submitter = "qsub"
        self.shebang = "#!/bin/bash"
        self.pragma = "#PBS"

        self.mpi = Resource(name="mpi_per_node", flag="ppn", default=4)
        self.omp = Resource(name="omp", flag="cpus-per-task", default=4)
        self.nodes = Resource(name="nodes", flag="nodes", default=1)
        self.time = Resource(name="time", flag="walltime", format="time", optional=False, default=3600)

    def parser(self, resources: "Resources") -> list:
        output = []
        for resource in resources:
            if resource:
                output.append(f"{resources.pragma} --{resource.flag}={resource.value}")

        output.append(f"export WORKDIR={resources['run_args']['remote_dir']}")

        return output


def f():
    return

ds = Dataset(f, url = Computer(), skip = False, mpi_per_node=32)

ds.append_run()

ds.run(dry_run=True)

print("\njobscript:")
print(ds.runners[0].jobscript.content)
appended run runner-0
Running Dataset
assessing run for runner dataset-06a84b6d-runner-0... running
launch command: cd temp_runner_remote && rm -f dataset-06a84b6d.manifest && sed -i -e "s#{rootdir}#$(pwd)#" dataset-06a84b6d-repo.sh && source dataset-06a84b6d-repo.sh && exec_and_log bash dataset-06a84b6d-master.sh

jobscript:
#!/bin/bash
#PBS --ppn=4
#PBS --cpus-per-task=4
#PBS --nodes=1
#PBS --walltime=01:00:00
export WORKDIR=temp_runner_remote


export DIR_e711be1e={run_rootdir}
source {run_rootdir}/dataset-06a84b6d-repo.sh

exec_and_log python dataset-06a84b6d-runner-0-run.py || write_to_log failed

The the run_args of the Dataset are available at run_args as a dict.

It may be safer in this case to use a get(..., None) as if the arg is not present it will cause your parser to Fail.